package com.dahuan.tables.udf;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

public class UDF_TableFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );
        //指定事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );

        // 2. 读入文件数据，得到DataStream
        DataStream<String> inputStream = env.readTextFile( "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt" );

        // 3. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map( line -> {
            String[] fields = line.split( "," );
            return new SensorReading( fields[0], new Long( fields[1] ), new Double( fields[2] ) );
        } );


        // 4. 将流转换成表，定义时间特性
        Table sensorTable = tableEnv.fromDataStream( dataStream, "id, timestamp as ts, temperature as temp" );

        // 5.自定义表函数，实现将id 拆分，并输出 (word,length)
        Split split = new Split( "_" );

        //需要在环境注册UDF
        tableEnv.registerFunction( "split",split );
       Table resultTable =  sensorTable.joinLateral( "split(id) as (word ,length)" )
                .select( "id,ts,word,length" );




        //SQL
        tableEnv.createTemporaryView( "sensor",sensorTable );
        Table resultSQL = tableEnv.sqlQuery( "select id,ts,word,length" +
                " from sensor ,lateral table(split(id)) as spiltid(word,length)");

        tableEnv.toAppendStream( resultTable, Row.class ).print("tableAPI");
        tableEnv.toAppendStream( resultSQL, Row.class ).print("SQL");





        env.execute("UDF_TableFunction");
    }

    //实现自定义TableFunction
    public static class Split extends TableFunction<Tuple2<String,Integer>>{

        //定义属性，分隔符
        private String separator = ",";

        public Split(String separator){
            this.separator = separator;
        }

        //必须实现一个eval方法，没有返回值
        public void eval(String strs){
            for (String str : strs.split( separator )) {
                collect( new Tuple2<>(str,str.length()) );
            }
        }
    }
}
